# -*- coding: utf-8 -*-
from airflow.operators.python import PythonOperator
from datetime import timedelta, datetime
from jms.dim import jms_dim__dim_cn_three_codes_change_dt
from jms.es import jms_es__cn_address_warehouse_day_rule2es
from jms.es.threecodes_update_esaddress.thrcodes_esaddr import jms_thrcodes_esaddr
from utils.alerts.es_threeCodeOnSuccess import es_threeCodeOnSuccess


def kwargs():
    kwargs = {
        "db": "jms_dim",
        "table": "dim_cn_second_codes_change_error_dt",
        "desc": "es地址库更新二段码失败的网点",
        "dingding_conn_id": "dingding_es_address",
    }
    return kwargs


# jms_es__threecodes_update_esaddress = DummyOperator(
#     task_id='jms_es__threecodes_update_esaddress',
#     email='chenhongping@yl-scm.com',
#     retries=0,
#     priority_weight=0,
#     # sla=timedelta(hours=2),
# )

jms_es__threecodes_update_esaddress = PythonOperator(
    task_id='jms_es__threecodes_update_esaddress',
    email=['yushuo@jtexpress.com','yl_bigdata@yl-scm.com'],
    python_callable=jms_thrcodes_esaddr,
    priority_weight=30,
# execution_timeout=max(datetime.now().replace(hour=7, minute=0, second=0) - datetime.now(), timedelta(seconds=1)),
    execution_timeout=timedelta(minutes=110),
    retries=1,
    pool='threecode_es',
    pool_slots=3,
    op_args=['{{ execution_date | cst_ds_nodash }}', '{{ var.value.env }}'],
    on_success_callback=es_threeCodeOnSuccess(kwargs()), )

jms_es__threecodes_update_esaddress << [
    jms_es__cn_address_warehouse_day_rule2es,
    jms_dim__dim_cn_three_codes_change_dt]
